Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8518] Fix RLI and Secondary index with custom payload or merge mode #12525

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

codope
Copy link
Member

@codope codope commented Dec 19, 2024

Change Logs

  • Ensure deletes via custom payload or merger gets synced to RLI (HUDI-8741)
  • Fix secondary index updates for event_time and custom merge modes (HUDI-8518)
  • Ensure secondary index updates are done in distributed manner without collected any record keys or maps in driver (HUDI-8740)

Impact

Index correctness with custom mergers, and optimization.

Risk level (write none, low medium or high below)

medium

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Dec 19, 2024
Comment on lines 830 to 841
// For log records, we only need to process deletes. However, deletes may or may not be part of delete blocks (delete using custom merge mode).
// So, we need to process the log files to get the record keys that are deleted. We can then generate RLI records for those keys.
// 1. Get all merged record keys - any custom merger which handles delete outside delete block should not be present in merged keys.
// 2. Get all un-merged record keys - this will contain all valid and deleted keys, irrespective of delete block or merge mode.
// 3. Get deleted record keys - this will be the difference of merged and un-merged keys.
Set<String> mergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, true, engineType);
Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, false, engineType);
Set<String> deletedRecordKeys = new HashSet<>(unMergedRecordKeys);
deletedRecordKeys.removeAll(mergedRecordKeys);
return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the logic to process deletes across all log files irrespective of merge mode. This is going to be a bit costly because we're doing both merged and unmerged log records scanning to figure out the deletes.

finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, true, engineType);
Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, false, engineType);
Set<String> deletedRecordKeys = new HashSet<>(unMergedRecordKeys);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this also give us deleted records from a previous log file even if not touched in this commit of interest?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the RLI logic as discussed, however, for SI the logic is same i.e. we still read from SI (to figure out delete records). But it avoids any collection on the driver. SI update is all through distributed task execution now.

@codope codope changed the title [HUDI-8518][HUDI-8741] Fix RLI and Secondary index with custom payload or merge mode [HUDI-8518] [HUDI-8741] Fix RLI and Secondary index with custom payload or merge mode Dec 23, 2024
@codope codope changed the title [HUDI-8518] [HUDI-8741] Fix RLI and Secondary index with custom payload or merge mode [HUDI-8518] Fix RLI and Secondary index with custom payload or merge mode Dec 23, 2024
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed source code. will review tests once feedback is addressed.

// Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
// the secondary index partition for each of these keys.
List<String> keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(),
dataMetaClient, instantTime);
HoodieData<String> keysToRemove = getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to see if we should optimize only to fetch keys that are part of delete log blocks here?

HoodieMetadataFileSystemView fsView = getMetadataView();
fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList()));
HoodieData<HoodieRecord> insertRecords =
readSecondaryKeysFromBaseFiles(engineContext, partitionFilePairs, parallelism, this.getClass().getSimpleName(), dataMetaClient, getEngineType(), indexDefinition, fsView);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a tracking ticket to support inserts to log files. For eg, w/ bucket index, we should be able to create secondary index. May be today, we rely on RLI. but eventually we need to support SI for any table and index type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HUDI-8793. IMO, we should take it up with metadata DAG rewrite - #12269
Even now, I am not very happy with the double work of getting inserts/updates/deletes, one in the write handle and then again while updating metadata table. It's basically merging all log records in every commit, which neutralizes the advantages of MOR.

List<List<String>> batches = new ArrayList<>();
List<String> currentBatch = new ArrayList<>();

while (iter.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we do count and then do repartition(count/batchSize)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count will trigger the DAG, and repartition could trigger shuffles right? I am just doing simple transformation to batch record keys.

// ignore log file data blocks.
return new ArrayList<HoodieRecord>().iterator();
checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file should be associated with a delta write stat");
List<String> logFilePaths = ((HoodieDeltaWriteStat) writeStat).getLogFiles().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fix L 806 to 814.

checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file should be associated with a delta write stat");
List<String> logFilePaths = ((HoodieDeltaWriteStat) writeStat).getLogFiles().stream()
.map(logFile -> new StoragePath(new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), logFile).toString())
.collect(toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, don't we need to group by fileSlice for all write stats?
for eg, if there are more than 1 log file added to the same file slice, does the current record generation will be right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! log files can roll over and we can have more than 1 log file in the same slice. I will update this.

.collect(Collectors.toSet());

// Calculate deleted keys in the current log file
Set<String> deletedKeysInCurrentLog = currentLogRecords.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we could simplify this.

  1. parse previous log files to generate {recordKey -> isDeleted}
  2. parse entire file slice (including inflight) to generate {recordKey -> isDeleted}

we are interested in deleted record keys w/ current log file. So, we could do something like

for every valid entry in (1)
   if deleted in (2) => add to deletedRecordKeyList. 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about doing it that way but could not convince myself that it will always be correct for log files for all kinds of merge strategies. It works perfectly well for base files.

My line of thinking is as follows:

1. Include keys that are valid in prev and deleted in current
2. Exclude keys that are deleted in prev but revived in current
3. Include keys that are newly deleted in current

For example, let's say we have 4 keys spread across 3 log files as below. 
Here, `Prev` means 2 previous log files and `Current` means entire slice including the third inflight log file.
True/False indicates `isDeleted`, and `-` means key not present.

Key   Prev    Current
k1    false   true
k2    true    false
k3      -     true
k4    false    -

If we go by the simpler logic, then it will only consider [k1, k4] (as these two are the only valid entries in previous log files). And then finally the deletedRecordKeyList will contain only k1. However, I think the expected deleted keys should be [k1, k3] right.

@@ -2241,7 +2377,7 @@ public static HoodieData<HoodieRecord> readSecondaryKeysFromBaseFiles(HoodieEngi
} else {
readerSchema = tableSchema;
}
return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
return createSecondaryIndexGenerator(metaClient, engineType, new ArrayList<>(logFilePaths), readerSchema, partition, dataFilePath, indexDefinition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need this fix if we not getting rid of entries to be deleted from SI processed separately.

I am talking about

 // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
    // the secondary index partition for each of these keys.
    HoodieData<String> keysToRemove = getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);

    HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
    // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to
    HoodiePairData<String, String> recordKeySecondaryKeyMap =
        metadata.getSecondaryKeys(keysToRemove, indexDefinition.getIndexName(), dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
    HoodieData<HoodieRecord> deleteRecords = recordKeySecondaryKeyMap.map(
        (recordKeyAndSecondaryKey) -> HoodieMetadataPayload.createSecondaryIndexRecord(recordKeyAndSecondaryKey.getKey(), recordKeyAndSecondaryKey.getValue(), indexDefinition.getIndexName(), true));

In HoodieBackedTableMetadataWriter.getSecondaryIndexUpdates

lets sync up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a remnant of one of the previous commits. you're right we may not need this. I'll check and remove if not needed.

@apache apache deleted a comment from hudi-bot Dec 25, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants